import contextlib
import faulthandler
import io
import multiprocessing
import os
import platform
import signal
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Optional


class FunctionExecution:
    TIMEOUT = 3
    WORKERS = 10

    def __init__(
        self,
    ):
        pass

    def __call__(self, doc, ground_truth, results):
        futures = []
        with ThreadPoolExecutor(max_workers=self.WORKERS) as executor:
            for completion in results:
                future = executor.submit(
                    self.check_correctness, doc, completion, self.TIMEOUT
                )
                futures.append(future)
        eval_results = [future.result() for future in as_completed(futures)]

        passed = [r["passed"] for r in eval_results]
        return passed

    @staticmethod
    def check_correctness(
        problem: Dict,
        completion: str,
        timeout: float,
        completion_id: Optional[int] = None,
        dataset="human_eval",
    ) -> Dict:
        """
        Evaluates the functional correctness of a completion by running the test
        suite provided in the problem.

        :param completion_id: an optional completion ID so we can match
            the results later even if execution finishes asynchronously.
        """

        if all(
            key in problem
            for key in [
                "text",
                "code",
                "task_id",
                "test_setup_code",
                "test_list",
                "challenge_test_list",
            ]
        ):
            dataset = "mbpp"

        def unsafe_execute():
            with create_tempdir():
                # These system calls are needed when cleaning up tempdir.
                import os
                import shutil

                rmtree = shutil.rmtree
                rmdir = os.rmdir
                chdir = os.chdir

                # Disable functionalities that can make destructive changes to the test.
                reliability_guard()

                # Construct the check program and run it.
                if dataset is None or dataset == "human_eval":
                    check_program = (
                        problem["prompt"]
                        + completion
                        + "\n"
                        + problem["test"]
                        + "\n"
                        + f"check({problem['entry_point']})"
                    )

                elif dataset == "mbpp":
                    description = problem["text"]
                    tests = "\n".join(problem["test_list"])
                    check_program = (
                        f'"""{description}\n{tests}"""\n' + completion + "\n" + tests
                    )
                try:
                    exec_globals = {}
                    with swallow_io():
                        with time_limit(timeout):
                            # WARNING
                            # This program exists to execute untrusted model-generated code. Although
                            # it is highly unlikely that model-generated code will do something overtly
                            # malicious in response to this test suite, model-generated code may act
                            # destructively due to a lack of model capability or alignment.
                            # Users are strongly encouraged to sandbox this evaluation suite so that it
                            # does not perform destructive actions on their host or network. For more
                            # information on how OpenAI sandboxes its code, see the accompanying paper.
                            # Once you have read this disclaimer and taken appropriate precautions,
                            # uncomment the following line and proceed at your own risk:
                            exec(check_program, exec_globals)
                    result.append("passed")
                except TimeoutException:
                    result.append("timed out")
                except BaseException as e:
                    result.append(f"failed: {e}")

                # Needed for cleaning up.
                shutil.rmtree = rmtree
                os.rmdir = rmdir
                os.chdir = chdir

        manager = multiprocessing.Manager()
        result = manager.list()

        p = multiprocessing.Process(target=unsafe_execute)
        p.start()
        p.join(timeout=timeout + 1)
        if p.is_alive():
            p.kill()

        if not result:
            result.append("timed out")

        return dict(
            task_id=problem["task_id"],
            passed=result[0] == "passed",
            result=result[0],
            completion_id=completion_id,
        )


@contextlib.contextmanager
def time_limit(seconds: float):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")

    signal.setitimer(signal.ITIMER_REAL, seconds)
    signal.signal(signal.SIGALRM, signal_handler)
    try:
        yield
    finally:
        signal.setitimer(signal.ITIMER_REAL, 0)


@contextlib.contextmanager
def swallow_io():
    stream = WriteOnlyStringIO()
    with contextlib.redirect_stdout(stream):
        with contextlib.redirect_stderr(stream):
            with redirect_stdin(stream):
                yield


@contextlib.contextmanager
def create_tempdir():
    with tempfile.TemporaryDirectory() as dirname:
        with chdir(dirname):
            yield dirname


class TimeoutException(Exception):
    pass


class WriteOnlyStringIO(io.StringIO):
    """StringIO that throws an exception when it's read from"""

    def read(self, *args, **kwargs):
        raise IOError

    def readline(self, *args, **kwargs):
        raise IOError

    def readlines(self, *args, **kwargs):
        raise IOError

    def readable(self, *args, **kwargs):
        """Returns True if the IO object can be read."""
        return False


class redirect_stdin(contextlib._RedirectStream):  # type: ignore
    _stream = "stdin"


@contextlib.contextmanager
def chdir(root):
    if root == ".":
        yield
        return
    cwd = os.getcwd()
    os.chdir(root)
    try:
        yield
    except BaseException as exc:
        raise exc
    finally:
        os.chdir(cwd)


def reliability_guard(maximum_memory_bytes: Optional[int] = None):
    """
    This disables various destructive functions and prevents the generated code
    from interfering with the test (e.g. fork bomb, killing other processes,
    removing filesystem files, etc.)

    WARNING
    This function is NOT a security sandbox. Untrusted code, including, model-
    generated code, should not be blindly executed outside of one. See the
    Codex paper for more information about OpenAI's code sandbox, and proceed
    with caution.
    """

    if maximum_memory_bytes is not None:
        import resource

        resource.setrlimit(
            resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes)
        )
        resource.setrlimit(
            resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes)
        )
        if not platform.uname().system == "Darwin":
            resource.setrlimit(
                resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes)
            )

    faulthandler.disable()

    import builtins

    builtins.exit = None
    builtins.quit = None

    import os

    os.environ["OMP_NUM_THREADS"] = "1"

    os.kill = None
    os.system = None
    os.putenv = None
    os.remove = None
    os.removedirs = None
    os.rmdir = None
    os.fchdir = None
    os.setuid = None
    os.fork = None
    os.forkpty = None
    os.killpg = None
    os.rename = None
    os.renames = None
    os.truncate = None
    os.replace = None
    os.unlink = None
    os.fchmod = None
    os.fchown = None
    os.chmod = None
    os.chown = None
    os.chroot = None
    os.fchdir = None
    os.lchflags = None
    os.lchmod = None
    os.lchown = None
    os.getcwd = None
    os.chdir = None

    import shutil

    shutil.rmtree = None
    shutil.move = None
    shutil.chown = None

    import subprocess

    subprocess.Popen = None  # type: ignore

    __builtins__["help"] = None

    import sys

    sys.modules["ipdb"] = None
    sys.modules["joblib"] = None
    sys.modules["resource"] = None
    sys.modules["psutil"] = None
    sys.modules["tkinter"] = None
